home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
EnigmA Amiga Run 1997 February
/
EnigmA AMIGA RUN 15 (1997)(G.R. Edizioni)(IT)[!][issue 1997-02][PLANET CD V].iso
/
enigma
/
earcd
/
utility
/
utilmisc
/
queue.lzh
/
queue_3.1
/
queue_test.c
< prev
next >
Wrap
C/C++ Source or Header
|
1996-11-28
|
7KB
|
331 lines
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <exec/exec.h>
#include <proto/exec.h>
#include <proto/dos.h>
#include "queue.h"
#include "queue_library.h" /* so we can look at internals */
#if 1
#define LISTEN_REOPEN
#define SEND_REOPEN
#endif
#if defined (__GNUC__)
#include "queue_inline.h"
#elif defined (__SASC)
#include "queue_pragmas.h"
#endif
#if !defined (__SASC)
struct ExecBase *SysBase = NULL;
struct DosLibrary *DOSBase = NULL;
#endif
struct Library *QueueBase = NULL;
ULONG sigbit = -1;
QMessage *Qmsg = NULL;
#define QMSG_DATA_SIZE 1
#define PID(msg) (*(ULONG *)(msg->qm_Data))
void
Block (int *abort)
{
int r;
if (r = rand () % 3)
Delay (r * 5);
if (SetSignal (0, SIGBREAKF_CTRL_C) & SIGBREAKF_CTRL_C)
*abort = 1;
}
void
cleanup (void)
{
if (sigbit != -1)
{
FreeSignal (sigbit);
}
#if !defined (__SASC)
if (DOSBase)
{
CloseLibrary ((struct Library *) DOSBase);
}
#endif
if (QueueBase)
{
if (Qmsg)
QFreeMsg (Qmsg, QMSG_DATA_SIZE);
CloseLibrary (QueueBase);
}
}
void
PrintQueue (QHandle *qhandle)
{
QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
QueueHandle *qh = (QueueHandle *) qhandle;
char s[512], *ptr;
QMessage *msg, *next, *err_msg = NULL;
int count = 0, replies, qnread;
// ObtainSemaphore (qn -> qn_Semaphore);
Forbid ();
ptr = s;
msg = (QMessage *) qn -> qn_List.lh_Head;
while (next = (QMessage *) msg -> qm_MinNode.mln_Succ)
{
switch (msg -> qm_Status)
{
case QMS_ACTIVE:
sprintf (ptr, "[%da:%d],", PID (msg), msg -> qm_Replies); break;
case QMS_REMOVED:
sprintf (ptr, "[%dr:%d],", PID (msg), msg -> qm_Replies); break;
case QMS_MARKER:
sprintf (ptr, "[m],"); break;
default:
sprintf (ptr, "[** %x **],", msg -> qm_Status); break;
}
if (msg -> qm_Status == QMS_ACTIVE ||
msg -> qm_Status == QMS_REMOVED)
{
if (msg -> qm_Replies > qn -> qn_Read)
err_msg = msg;
}
ptr = s + strlen (s);
msg = next;
if (count ++ > 42)
break;
}
if (qh -> qh_Mode == QMODE_LISTEN)
{
if (msg = qh -> qh_un.qhl.qhl_Message)
sprintf (ptr, " current = %d.\n", PID (msg));
else
sprintf (ptr, " current = <null>.\n");
}
else
{
sprintf (ptr, " qn_Read = %d.\n", qn -> qn_Read);
}
if (err_msg)
{
replies = err_msg -> qm_Replies;
qnread = qn -> qn_Read;
}
Permit ();
// ReleaseSemaphore (qn -> qn_Semaphore);
printf (s);
if (err_msg)
{
printf ("\nToo many replies: %d (qn_Read = %d).\n", replies, qnread);
while (1)
Delay (100);
}
}
int
main (int argc, char *argv[])
{
ULONG sigmask;
QMessage *msg = NULL;
QHandle qh = NULL;
char *qn, *queuename = "testqueue";
int r, abort = 0, reopen = 1;
ULONG pid = 0;
if (argc == 2)
{
pid = ((struct Process *) FindTask (0)) -> pr_TaskNum;
printf ("Server ( pid = %ld ).\n", pid);
}
else
printf ("Client.\n", pid);
if (qn = getenv ("TESTQUEUE"))
{
printf ("Queue = %s.\n", qn);
queuename = qn;
}
#if !defined (__SASC)
SysBase = *(struct ExecBase **)4;
if (!(DOSBase = (struct DosLibrary *) OpenLibrary ("dos.library", 37)))
return 20;
#endif
atexit (cleanup);
if ((sigbit = AllocSignal (-1)) == -1)
{
printf ("Failed to allocate signal.\n");
return 20;
}
sigmask = 1 << sigbit;
if (!(QueueBase = OpenLibrary ("queue.library", 0)))
{
printf ("Failed to open queue.library.\n");
return 20;
}
if (!(Qmsg = QAllocMsg ( QMSG_DATA_SIZE )))
{
printf ("Couldn't allocate message.\n");
return 20;
}
if (!pid) /* Client */
{
r = time (NULL);
srand (r);
while (1)
{
if (!abort && !reopen)
{
printf ("Waiting ...\n");
abort = Wait (sigmask | SIGBREAKF_CTRL_C) & SIGBREAKF_CTRL_C;
printf ("Signal.\n");
}
if (abort || reopen)
{
if (qh)
{
Forbid ();
while (msg = QGetMsg (qh));
QClose (qh);
Permit ();
if (abort)
{
printf ("Exiting.\n");
return 0;
}
}
}
if (reopen)
{
Delay (50);
if (!(qh = QOpen (queuename, QMODE_LISTEN, sigbit)))
{
printf ("Failed to open \"%s\" queue.\n", queuename);
return 20;
}
printf ("Queue \"%s\" opened.\n", queuename);
reopen = 0;
}
PrintQueue (qh);
if (rand () > RAND_MAX/2) /*** Process all messages ***/
{
r = 0;
while (msg = QGetMsg (qh))
{
printf ("Got message from %d.\n", PID (msg));
r ++;
}
if (!r)
printf ("Got no message(s).\n");
}
else /*** Reply one message & block ***/
{
if (msg = QGetMsg (qh))
printf ("Got message from %d (reply, block).\n", PID (msg));
else
printf ("Got no message (reply, block).\n");
if (rand () > RAND_MAX/2)
{
QReplyMsg (qh);
Block (&abort);
}
else
{
Block (&abort);
QReplyMsg (qh);
}
}
#ifdef LISTEN_REOPEN
if (rand () < RAND_MAX/10)
reopen = 1;
#endif
}
}
else
{
PID (Qmsg) = pid;
while (1)
{
if (!abort && !reopen)
{
printf ("Waiting ...\n");
abort = Wait (sigmask | SIGBREAKF_CTRL_C) & SIGBREAKF_CTRL_C;
printf ("Signal.\n");
}
if (abort || reopen)
{
if (qh)
{
while ((r = QClose (qh)) > 0)
{
printf ("Cannot exit: %d message(s) left in queue.\n", r);
PrintQueue (qh);
Delay (10);
QGetMsg (qh);
}
if (abort)
{
printf ("All done.\n");
return 0;
}
}
}
if (reopen)
{
if (!(qh = QOpen (queuename, QMODE_SEND, sigbit)))
{
printf ("Failed to open \"%s\" queue.\n", queuename);
return 20;
}
printf ("Queue \"%s\" opened.\n", queuename);
msg = Qmsg;
reopen = 0;
}
else
{
msg = QGetMsg (qh);
if (msg == Qmsg)
printf ("Got my message (%d).\n", PID (msg));
else if (msg)
printf ("Got wrong message (%d).\n", PID (msg));
else
printf ("Got no message.\n");
Block (&abort);
}
if (!abort)
{
#ifdef SEND_REOPEN /* ERROR */
if (rand () < RAND_MAX/10)
reopen = 1;
#endif
if (msg == Qmsg)
{
printf ("Sending message (%d): ", PID (msg));
QAddMsg (qh, Qmsg);
PrintQueue (qh);
}
}
}
}
}